import com.gfscold.trans.common.app.BaseSQLApp;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class DwsTransOrder extends BaseSQLApp {
    @Override
    public void handle(StreamTableEnvironment tableEnv, StreamExecutionEnvironment env, String groupId) {

        tableEnv.executeSql("CREATE TABLE IF NOT EXISTS dwd_trans_order_device (\n" +
                "    `transportation_order_number` STRING COMMENT '运单号',\n" +
                "    `create_time` TIMESTAMP COMMENT '创建日期',\n" +
                "    `device_code` STRING COMMENT '设备编号',\n" +
                "    `device_name` STRING COMMENT '设备名称',\n" +
                "    `platform_code` STRING COMMENT '平台代码',\n" +
                "    `platform_name` STRING COMMENT '平台名称',\n" +
                "    `type` SMALLINT COMMENT '类型（1 固定式设备 2 移动式设备）',\n" +
                "    `index_num` SMALLINT COMMENT '序号',\n" +
                "    `device_data_start_time` TIMESTAMP COMMENT '设备数据开始时间',\n" +
                "    `device_data_end_time` TIMESTAMP COMMENT '设备数据结束时间',\n" +
                "    `company_id` INT COMMENT '所属公司',\n" +
                "    `company_cn_name` STRING COMMENT '公司名称',\n" +
                "    `execute_company_id` INT COMMENT '下级执行公司 ',\n" +
                "    `execute_company_cn_name` STRING COMMENT '执行公司名称',\n" +
                "    `carrier_id` INT COMMENT '承运商',\n" +
                "    `plan_dispatch_date` STRING COMMENT '计划发车时间',\n" +
                "    `actual_dispatch_date` STRING COMMENT '实际发车时间',\n" +
                "    `vehicle_id` INT COMMENT '车辆',\n" +
                "    `vehicle_num` STRING COMMENT '车牌号',\n" +
                "    `driver` STRING COMMENT '司机',\n" +
                "    `phone` STRING COMMENT '手机号',\n" +
                "    `remark` STRING COMMENT '备注',\n" +
                "    `operate_status` INT NOT NULL COMMENT '操作状态',\n" +
                "    `status` BOOLEAN COMMENT '状态',\n" +
                "    `settlement_channel_value` INT NOT NULL COMMENT '结算渠道',\n" +
                "    `settlement_channel_service_rate` DECIMAL(5,4) COMMENT '结算服务费率',\n" +
                "    `carrier_code` STRING COMMENT '承运商',\n" +
                "    `carrier_name` STRING COMMENT '承运商',\n" +
                "    `gps_code` STRING COMMENT 'gps设备号',\n" +
                "    `actual_vehicle_type` STRING COMMENT '实际订单车型（调度实际指定）',\n" +
                "    `driver_id` INT COMMENT '司机id',\n" +
                "    `original_order_operate_status` INT COMMENT '取消前原始状态',\n" +
                "    `source` INT COMMENT '来源 0手工录入，1导入，2 EDI',\n" +
                "    `vehicle_type` STRING COMMENT '车型（对应车辆基础数据）',\n" +
                "    `plan_carrier_id` INT COMMENT '计划承运商',\n" +
                "    `plan_carrier_code` STRING COMMENT '计划承运商',\n" +
                "    `plan_carrier_name` STRING COMMENT '计划承运商',\n" +
                "    `vehicle_time` STRING COMMENT '排车时间',\n" +
                "    `issue_time` STRING COMMENT '下发时间',\n" +
                "    `earliest_delivery_time` STRING COMMENT '最早提货时间',\n" +
                "    `latest_signing_time` STRING COMMENT '最晚签收时间',\n" +
                "    `max_temperature` FLOAT COMMENT '温控最高温度',\n" +
                "    `second_temperature` FLOAT COMMENT '第二高温度',\n" +
                "    `third_temperature` FLOAT COMMENT '第三高温度',\n" +
                "    `lowest_temperature` FLOAT COMMENT '最低温度',\n" +
                "    `order_type` INT NOT NULL COMMENT '订单业务类型',\n" +
                "    `course_sort` STRING COMMENT '路线',\n" +
                "    `project_id` INT COMMENT '项目ID',\n" +
                "    PRIMARY KEY ( `transportation_order_number`, `create_time`, `device_code`) NOT ENFORCED\n" +
                ") WITH (\n" +
                "  'connector' = 'doris',\n" +
                "  'fenodes' = '172.19.2.250:8030',\n" +
                "  'table.identifier' = 'test_dw.dwd_trans_order_device',\n" +
                "  'username' = 'zhhuang4',\n" +
                "  'password' = 'HZfR1cqrRcZC^',\n" +
                "  'sink.properties.read_json_by_line' = 'true',\n" +
                "  'sink.properties.format' = 'json',\n" +
                "  'sink.properties.column_separator' = ', ')" );
        tableEnv.executeSql("select * from dwd_trans_order_device").print();
    }

    public static void main(String[] args) {
        new DwsTransOrder().start(8080, 1, "dws_trans_order_ck_group_id");
    }
}
